package com.idea.relax.log.appender.kafka;

import ch.qos.logback.classic.AsyncAppender;
import ch.qos.logback.classic.LoggerContext;
import com.idea.relax.log.appender.ILogbackAppender;
import com.idea.relax.log.props.LogLevel;
import com.idea.relax.log.props.RelaxLogProperties;
import com.idea.relax.log.support.utils.*;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.core.Ordered;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 * 实验性
 *
 * @author salad
 * @date: 2022/12/17
 * @description: Kafka日志推送的appender
 */
@Slf4j
public class LoggingKafkaAppender implements ILogbackAppender {

    private final RelaxLogProperties properties;

    private static Map<String, String> placeholderValueMap;

    private final KafkaTemplate<String, String> kafkaTemplate;

    public LoggingKafkaAppender(RelaxLogProperties properties,
                                KafkaTemplate<String, String> kafkaTemplate) {
        this.properties = properties;
        this.kafkaTemplate = kafkaTemplate;
        LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
        this.start(context);
    }

    @Override
    public void start(LoggerContext context) {
        log.info("Kafka logging start.");
        apply(context);
    }

    @Override
    public void reset(LoggerContext context) {
        log.info("Kafka logging reset.");
        apply(context);
    }

    @Override
    public void apply(LoggerContext context) {
        RelaxLogProperties.KafkaLog props = properties.getKafkaLog();
        if (props.isEnabled()) {
            Set<LogLevel> logLevels = props.getLogLevels();
            placeholderValueMap = new HashMap<>(2);
            if (Func.isEmpty(logLevels)) {
                String levelStr = LogLevel.ALL.getLevel().levelStr.toLowerCase();
                placeholderValueMap.put("level", levelStr);
                addLevelLogKafkaAppender(context, LogLevel.ALL, props);
            } else {
                for (LogLevel level : logLevels) {
                    placeholderValueMap.put("level", level.getLevel().levelStr.toLowerCase());
                    addLevelLogKafkaAppender(context, level, props);
                }
            }
            log.info("Kafka logging apply successfully.");
        }

    }

    /**
     *
     */
    private void addLevelLogKafkaAppender(LoggerContext context,
                                          LogLevel level,
                                          RelaxLogProperties.KafkaLog props) {
        String topic = props.getTopic();
        if (null == kafkaTemplate) {
            throw new NoSuchBeanDefinitionException("The kafkaTemplate cannot be null!");
        }
        KafkaAppender kafkaAppender = new KafkaAppender(topic, kafkaTemplate);
        String appenderName = StringUtil.isBlank(props.getAppenderNamePrefix()) ?
                "KAFKA_" : props.getAppenderNamePrefix() + level.getLevel().levelStr;
        kafkaAppender.setName(appenderName);
        if (level != null) {
            kafkaAppender.addFilter(LoggingUtil.levelFilter(context, level));
        }
        kafkaAppender.setContext(context);
        if (props.isAsyncAppender()) {
            AsyncAppender asyncAppender = LoggingUtil.bindAsyncAppender(kafkaAppender, 1024, 0, false);
            asyncAppender.start();
        } else {
            kafkaAppender.start();
        }
        decideApplyAppender(context, appenderName, kafkaAppender);
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

}
